{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>\n",
    "\n",
    "<i>Licensed under the MIT License.</i>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Building a Real-time Recommendation API\n",
    "\n",
    "This reference architecture shows the full lifecycle of building a recommendation system. It walks through the creation of appropriate azure resources, training a recommendation model using a Virtual Machine or Databricks, and deploying it as an API. It uses Azure Cosmos DB, Azure Machine Learning, and Azure Kubernetes Service. \n",
    "\n",
    "This architecture can be generalized for many recommendation engine scenarios, including recommendations for products, movies, and news. \n",
    "### Architecture\n",
    "![architecture](https://recodatasets.z20.web.core.windows.net/images/reco-arch.png \"Architecture\")\n",
    "\n",
    "**Scenario**: A media organization wants to provide movie or video recommendations to its users. By providing personalized recommendations, the organization meets several business goals, including increased click-through rates, increased engagement on site, and higher user satisfaction.\n",
    "\n",
    "In this reference, we train and deploy a real-time recommender service API that can provide the top 10 movie recommendations for a given user. \n",
    "\n",
    "### Components\n",
    "This architecture consists of the following key components:\n",
    "* [Azure Databricks](https://docs.microsoft.com/en-us/azure/azure-databricks/what-is-azure-databricks)<sup>1)</sup> is used as a development environment to prepare input data and train the recommender model on a Spark cluster. Azure Databricks also provides an interactive workspace to run and collaborate on notebooks for any data processing or machine learning tasks.\n",
    "* [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes)(AKS) is used to deploy and operationalize a machine learning model service API on a Kubernetes cluster. AKS hosts the containerized model, providing scalability that meets throughput requirements, identity and access management, and logging and health monitoring. \n",
    "* [Azure Cosmos DB](https://docs.microsoft.com/en-us/azure/cosmos-db/introduction) is a globally distributed database service used to store the top 10 recommended movies for each user. Azure Cosmos DB is ideal for this scenario as it provides low latency (10 ms at 99th percentile) to read the top recommended items for a given user. \n",
    "* [Azure Machine Learning Service](https://docs.microsoft.com/en-us/azure/machine-learning/service/) is a service used to track and manage machine learning models, and then package and deploy these models to a scalable Azure Kubernetes Service environment.\n",
    "\n",
    "<sup>1) Here, we are just giving an example of using Azure Databricks. Any platforms listed in [SETUP](../../SETUP.md) can be used as well.</sup>\n",
    "\n",
    "\n",
    "### Table of Contents.\n",
    "0. [File Imports](#0-File-Imports)\n",
    "1. [Service Creation](#1-Service-Creation)\n",
    "2. [Training and evaluation](#2-Training)\n",
    "3. [Operationalization](#3.-Operationalize-the-Recommender-Service)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "To run this notebook on Azure Databricks, you should setup Azure Databricks by following the appropriate sections in the repository [SETUP instructions](../../SETUP.md) and import this notebook into your Azure Databricks Workspace (see instructions [here](https://docs.azuredatabricks.net/user-guide/notebooks/notebook-manage.html#import-a-notebook)).\n",
    "\n",
    "Please note: This notebook **REQUIRES** that you add the dependencies to support **operationalization**. See [SETUP](../../SETUP.md) for details.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 0 File Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Azure SDK version: 1.0.69\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "import sys\n",
    "sys.path.append(\"../../\")\n",
    "import urllib\n",
    "\n",
    "from azure.common.client_factory import get_client_from_cli_profile\n",
    "import azure.mgmt.cosmosdb\n",
    "import azureml.core\n",
    "from azureml.core import Workspace\n",
    "from azureml.core.model import Model\n",
    "from azureml.core.compute import AksCompute, ComputeTarget\n",
    "from azureml.core.compute_target import ComputeTargetException\n",
    "from azureml.core.webservice import Webservice, AksWebservice\n",
    "from azureml.exceptions import WebserviceException\n",
    "from azureml.core import Environment\n",
    "from azureml.core.environment import CondaDependencies\n",
    "from azureml.core.model import InferenceConfig\n",
    "from azureml.core.environment import SparkPackage\n",
    "import pydocumentdb.document_client as document_client\n",
    "from pyspark.ml.recommendation import ALS\n",
    "from pyspark.sql.types import StructType, StructField\n",
    "from pyspark.sql.types import FloatType, IntegerType, LongType\n",
    "\n",
    "from reco_utils.common.timer import Timer\n",
    "from reco_utils.common.spark_utils import start_or_get_spark\n",
    "from reco_utils.dataset import movielens\n",
    "from reco_utils.dataset.cosmos_cli import find_collection, read_collection, read_database, find_database\n",
    "from reco_utils.dataset.download_utils import maybe_download\n",
    "from reco_utils.dataset.spark_splitters import spark_random_split\n",
    "from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation\n",
    "from reco_utils.common.notebook_utils import is_databricks\n",
    "\n",
    "print(\"Azure SDK version:\", azureml.core.VERSION)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "\n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://192.168.99.107:4040\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v2.4.3</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>local[*]</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>ALS</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        "
      ],
      "text/plain": [
       "<SparkContext master=local[*] appName=ALS>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# Start spark session if needed\n",
    "if not is_databricks():\n",
    "    cosmos_connector = (\n",
    "        \"https://search.maven.org/remotecontent?filepath=com/microsoft/azure/\"\n",
    "        \"azure-cosmosdb-spark_2.3.0_2.11/1.3.3/azure-cosmosdb-spark_2.3.0_2.11-1.3.3-uber.jar\"\n",
    "    )\n",
    "    jar_filepath = maybe_download(url=cosmos_connector, filename=\"cosmos.jar\")\n",
    "    spark = start_or_get_spark(\"ALS\", memory=\"10g\", jars=[jar_filepath])\n",
    "    sc = spark.sparkContext\n",
    "display(sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1 Service Creation\n",
    "Modify the **Subscription ID** to the subscription you would like to deploy to and set the resource name variables.\n",
    "\n",
    "#### Services created by this notebook:\n",
    "1. [Azure ML Service](https://azure.microsoft.com/en-us/services/machine-learning-service/)\n",
    "    1. [Azure ML Workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace)\n",
    "    1. [Azure Application Insights](https://azure.microsoft.com/en-us/services/monitor/)\n",
    "    1. [Azure Storage](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview)\n",
    "    1. [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/)    \n",
    "\n",
    "1. [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/)\n",
    "1. [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service/)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Add your Azure subscription ID**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Add your subscription ID\n",
    "subscription_id = \"\"\n",
    "\n",
    "# Set your workspace name\n",
    "workspace_name = \"o16n-test\"\n",
    "resource_group = \"{}-rg\".format(workspace_name)\n",
    "\n",
    "# Set your region to deploy Azure ML workspace\n",
    "location = \"eastus\"\n",
    "\n",
    "# AzureML service and Azure Kubernetes Service prefix\n",
    "service_name = \"mvl-als\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Login for Azure CLI so that AzureML can use Azure CLI login credentials\n",
    "!az login"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Change subscription if needed\n",
    "!az account set --subscription {subscription_id}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check account\n",
    "!az account show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# CosmosDB\n",
    "# account_name for CosmosDB cannot have \"_\" and needs to be less than 31 chars\n",
    "account_name = \"{}-ds-sql\".format(workspace_name).replace(\"_\", \"-\")[:31]\n",
    "cosmos_database = \"recommendations\"\n",
    "cosmos_collection = \"user_recommendations_als\"\n",
    "\n",
    "# AzureML resource names\n",
    "model_name = \"{}-reco.mml\".format(service_name)\n",
    "aks_name = \"{}-aks\".format(service_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# top k items to recommend\n",
    "TOP_K = 10\n",
    "\n",
    "# Select MovieLens data size: 100k, 1m, 10m, or 20m\n",
    "MOVIELENS_DATA_SIZE = '100k'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "userCol = \"UserId\"\n",
    "itemCol = \"MovieId\"\n",
    "ratingCol = \"Rating\"\n",
    "\n",
    "train_data_path = \"train\"\n",
    "test_data_path = \"test\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.1 Import or create the AzureML Workspace. \n",
    "This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "ws = Workspace.create(\n",
    "    name=workspace_name,\n",
    "    subscription_id=subscription_id,\n",
    "    resource_group=resource_group, \n",
    "    location=location,\n",
    "    exist_ok=True\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.2 Create a Cosmos DB to store recommendation results\n",
    "\n",
    "This step will take some time to create CosmosDB resources."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Database created\n",
      "Collection created\n"
     ]
    }
   ],
   "source": [
    "# explicitly pass subscription_id in case user has multiple subscriptions\n",
    "client = get_client_from_cli_profile(\n",
    "    azure.mgmt.cosmosdb.CosmosDB,\n",
    "    subscription_id=subscription_id\n",
    ")\n",
    "\n",
    "async_cosmosdb_create = client.database_accounts.create_or_update(\n",
    "    resource_group,\n",
    "    account_name,\n",
    "    {\n",
    "        'location': location,\n",
    "        'locations': [{\n",
    "            'location_name': location\n",
    "        }]\n",
    "    }\n",
    ")\n",
    "account = async_cosmosdb_create.result()\n",
    "\n",
    "my_keys = client.database_accounts.list_keys(resource_group, account_name)\n",
    "master_key = my_keys.primary_master_key\n",
    "endpoint = \"https://\" + account_name + \".documents.azure.com:443/\"\n",
    "\n",
    "# DB client\n",
    "client = document_client.DocumentClient(endpoint, {'masterKey': master_key})\n",
    "\n",
    "if not find_database(client, cosmos_database):\n",
    "    db = client.CreateDatabase({'id': cosmos_database })\n",
    "    print(\"Database created\")\n",
    "else:\n",
    "    db = read_database(client, cosmos_database)\n",
    "    print(\"Database found\")\n",
    "\n",
    "# Create collection options\n",
    "options = dict(offerThroughput=11000)\n",
    "\n",
    "# Create a collection\n",
    "collection_definition = {\n",
    "    'id': cosmos_collection,\n",
    "    'partitionKey': {'paths': ['/id'],'kind': 'Hash'}\n",
    "}\n",
    "if not find_collection(client, cosmos_database, cosmos_collection):\n",
    "    collection = client.CreateCollection(\n",
    "        db['_self'], \n",
    "        collection_definition,\n",
    "        options\n",
    "    )\n",
    "    print(\"Collection created\")\n",
    "else:\n",
    "    collection = read_collection(client, cosmos_database, cosmos_collection)\n",
    "    print(\"Collection found\")\n",
    "    \n",
    "dbsecrets = dict(\n",
    "    Endpoint=endpoint, \n",
    "    Masterkey=master_key, \n",
    "    Database=cosmos_database, \n",
    "    Collection=cosmos_collection, \n",
    "    Upsert=True\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2 Training\n",
    "\n",
    "Next, we train an [Alternating Least Squares model](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) on [MovieLens](https://grouplens.org/datasets/movielens/) dataset.\n",
    "\n",
    "### 2.1 Download the MovieLens dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 4.81k/4.81k [00:00<00:00, 11.0kKB/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+------+\n",
      "|UserId|MovieId|Rating|\n",
      "+------+-------+------+\n",
      "|   196|    242|   3.0|\n",
      "|   186|    302|   3.0|\n",
      "|    22|    377|   1.0|\n",
      "|   244|     51|   2.0|\n",
      "|   166|    346|   1.0|\n",
      "|   298|    474|   4.0|\n",
      "|   115|    265|   2.0|\n",
      "|   253|    465|   5.0|\n",
      "|   305|    451|   3.0|\n",
      "|     6|     86|   3.0|\n",
      "|    62|    257|   2.0|\n",
      "|   286|   1014|   5.0|\n",
      "|   200|    222|   5.0|\n",
      "|   210|     40|   3.0|\n",
      "|   224|     29|   3.0|\n",
      "|   303|    785|   3.0|\n",
      "|   122|    387|   5.0|\n",
      "|   194|    274|   2.0|\n",
      "|   291|   1042|   4.0|\n",
      "|   234|   1184|   2.0|\n",
      "+------+-------+------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
    "schema = StructType(\n",
    "    (\n",
    "        StructField(userCol, IntegerType()),\n",
    "        StructField(itemCol, IntegerType()),\n",
    "        StructField(ratingCol, FloatType()),\n",
    "    )\n",
    ")\n",
    "\n",
    "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.2 Split the data into train, test\n",
    "There are several ways of splitting the data: random, chronological, stratified, etc., each of which favors a different real-world evaluation use case. We will split randomly in this example – for more details on which splitter to choose, consult [this guide](../01_prepare_data/data_split.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "N train 75031\n",
      "N test 24969\n"
     ]
    }
   ],
   "source": [
    "train, test = spark_random_split(data, ratio=0.75, seed=42)\n",
    "print(\"N train\", train.cache().count())\n",
    "print(\"N test\", test.cache().count())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.3 Train the ALS model on the training data\n",
    "\n",
    "To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used to estimate the model are set based on [this page](http://mymedialite.net/examples/datasets.html).\n",
    "\n",
    "Under most circumstances, you would explore the hyperparameters and choose an optimal set based on some criteria. For additional details on this process, please see additional information in the deep dives [here](../04_model_select_and_optimize/tuning_spark_als.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "als = ALS(\n",
    "    rank=10,\n",
    "    maxIter=15,\n",
    "    implicitPrefs=False,\n",
    "    alpha=0.1,\n",
    "    regParam=0.05,\n",
    "    coldStartStrategy='drop',\n",
    "    nonnegative=True,\n",
    "    userCol=userCol,\n",
    "    itemCol=itemCol,\n",
    "    ratingCol=ratingCol,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = als.fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.4 Get top-k recommendations for our testing data\n",
    "\n",
    "In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.\n",
    "\n",
    "In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+----------+\n",
      "|UserId|MovieId|prediction|\n",
      "+------+-------+----------+\n",
      "|   148|    148| 2.2560365|\n",
      "|   463|    148|  2.936453|\n",
      "|   471|    148| 3.8262048|\n",
      "|   496|    148| 2.2901149|\n",
      "|   833|    148| 1.7296925|\n",
      "|   243|    148| 2.2667758|\n",
      "|   392|    148| 2.4605818|\n",
      "|   540|    148| 3.0631547|\n",
      "|   623|    148| 3.1649487|\n",
      "|   737|    148| 1.7344649|\n",
      "|   858|    148| 1.8472893|\n",
      "|   897|    148| 3.5229573|\n",
      "|    31|    148| 1.9613894|\n",
      "|   516|    148| 3.1411705|\n",
      "|    85|    148| 2.2291098|\n",
      "|   137|    148| 4.0498815|\n",
      "|   251|    148| 3.2075853|\n",
      "|   451|    148|  4.016654|\n",
      "|   580|    148|  2.843738|\n",
      "|   808|    148| 3.4666717|\n",
      "+------+-------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Get the cross join of all user-item pairs and score them.\n",
    "users = train.select(userCol).distinct()\n",
    "items = train.select(itemCol).distinct()\n",
    "user_item = users.crossJoin(items)\n",
    "dfs_pred = model.transform(user_item)\n",
    "dfs_pred.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+----------+\n",
      "|UserId|MovieId|prediction|\n",
      "+------+-------+----------+\n",
      "|     1|    587| 3.4595456|\n",
      "|     1|    869|  2.967618|\n",
      "|     1|   1208|  2.858056|\n",
      "|     1|   1677| 2.9235902|\n",
      "|     2|     80| 3.0129535|\n",
      "|     2|    303| 3.0719132|\n",
      "|     2|    472| 3.4143965|\n",
      "|     2|    582|  4.877232|\n",
      "|     2|    838|  1.529903|\n",
      "|     2|    975| 2.9654517|\n",
      "|     2|   1260|  3.252151|\n",
      "|     2|   1325| 1.1417896|\n",
      "|     2|   1381| 3.7900786|\n",
      "|     2|   1530|  2.625749|\n",
      "|     3|     22| 2.7082264|\n",
      "|     3|     57| 2.5156925|\n",
      "|     3|     89| 3.7927365|\n",
      "|     3|    367| 2.7083492|\n",
      "|     3|   1091| 1.5662774|\n",
      "|     3|   1167| 3.2427955|\n",
      "+------+-------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Remove seen items.\n",
    "dfs_pred_exclude_train = dfs_pred.alias(\"pred\").join(\n",
    "    train.alias(\"train\"),\n",
    "    (dfs_pred[userCol]==train[userCol]) & (dfs_pred[itemCol]==train[itemCol]),\n",
    "    how='outer'\n",
    ")\n",
    "top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[\"train.\"+ratingCol].isNull()) \\\n",
    "    .select(\"pred.\"+userCol, \"pred.\"+itemCol, \"pred.prediction\")\n",
    "\n",
    "top_all.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.5 Evaluate how well ALS performs\n",
    "\n",
    "Evaluate model performance using metrics such as Precision@K, Recall@K, [MAP@K](https://en.wikipedia.org/wiki/Evaluation_measures_\\(information_retrieval\\) or [nDCG@K](https://en.wikipedia.org/wiki/Discounted_cumulative_gain). For a full guide on what metrics to evaluate your recommender with, consult [this guide]../03_evaluate/evaluation.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+------+\n",
      "|UserId|MovieId|Rating|\n",
      "+------+-------+------+\n",
      "|     1|      2|   3.0|\n",
      "|     1|      3|   4.0|\n",
      "|     1|      4|   3.0|\n",
      "|     1|     14|   5.0|\n",
      "|     1|     17|   3.0|\n",
      "|     1|     27|   2.0|\n",
      "|     1|     29|   1.0|\n",
      "|     1|     35|   1.0|\n",
      "|     1|     36|   2.0|\n",
      "|     1|     51|   4.0|\n",
      "|     1|     52|   4.0|\n",
      "|     1|     54|   3.0|\n",
      "|     1|     56|   4.0|\n",
      "|     1|     60|   5.0|\n",
      "|     1|     64|   5.0|\n",
      "|     1|     69|   3.0|\n",
      "|     1|     77|   4.0|\n",
      "|     1|     83|   3.0|\n",
      "|     1|     85|   3.0|\n",
      "|     1|     88|   4.0|\n",
      "+------+-------+------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cols = {\n",
    "    'col_user': userCol,\n",
    "    'col_item': itemCol,\n",
    "    'col_rating': ratingCol,\n",
    "    'col_prediction': \"prediction\",\n",
    "}\n",
    "\n",
    "test.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model:\tALS\n",
      "Top K:\t10\n",
      "MAP:\t0.003698\n",
      "NDCG:\t0.034331\n",
      "Precision@K:\t0.039343\n",
      "Recall@K:\t0.014976\n"
     ]
    }
   ],
   "source": [
    "# Evaluate Ranking Metrics\n",
    "rank_eval = SparkRankingEvaluation(\n",
    "    test, \n",
    "    top_all, \n",
    "    k=TOP_K,\n",
    "    **cols\n",
    ")\n",
    "\n",
    "print(\n",
    "    \"Model:\\tALS\",\n",
    "    \"Top K:\\t%d\" % rank_eval.k,\n",
    "    \"MAP:\\t%f\" % rank_eval.map_at_k(),\n",
    "    \"NDCG:\\t%f\" % rank_eval.ndcg_at_k(),\n",
    "    \"Precision@K:\\t%f\" % rank_eval.precision_at_k(),\n",
    "    \"Recall@K:\\t%f\" % rank_eval.recall_at_k(), sep='\\n'\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model:\tALS rating prediction\n",
      "RMSE:\t0.95\n",
      "MAE:\t0.740282\n",
      "Explained variance:\t0.289807\n",
      "R squared:\t0.285394\n"
     ]
    }
   ],
   "source": [
    "# Evaluate Rating Metrics\n",
    "prediction = model.transform(test)\n",
    "rating_eval = SparkRatingEvaluation(\n",
    "    test, \n",
    "    prediction, \n",
    "    **cols\n",
    ")\n",
    "\n",
    "print(\n",
    "    \"Model:\\tALS rating prediction\",\n",
    "    \"RMSE:\\t%.2f\" % rating_eval.rmse(),\n",
    "    \"MAE:\\t%f\" % rating_eval.mae(),\n",
    "    \"Explained variance:\\t%f\" % rating_eval.exp_var(),\n",
    "    \"R squared:\\t%f\" % rating_eval.rsquared(), sep='\\n'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.6 Save the model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "(model\n",
    " .write()\n",
    " .overwrite()\n",
    " .save(model_name))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Operationalize the Recommender Service\n",
    "Once the model is built with desirable performance, it will be operationalized to run as a REST endpoint to be utilized by a real time service. We will utilize [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/), [Azure Machine Learning Service](https://azure.microsoft.com/en-us/services/machine-learning-service/), and [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes) to operationalize the recommender service."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.1 Create a look-up for Recommendations in Cosmos DB\n",
    "\n",
    "First, the Top-10 recommendations for each user as predicted by the model are stored as a lookup table in Cosmos DB. At runtime, the service will return the Top-10 recommendations as precomputed and stored in Cosmos DB:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+\n",
      "| id|             MovieId|\n",
      "+---+--------------------+\n",
      "|471|[745, 1540, 244, ...|\n",
      "|463|[64, 190, 1286, 3...|\n",
      "|833|[1192, 179, 1524,...|\n",
      "|496|[320, 1589, 262, ...|\n",
      "|148|[1512, 718, 793, ...|\n",
      "|540|[958, 1512, 1368,...|\n",
      "|392|[1643, 1449, 1512...|\n",
      "|243|[285, 251, 1405, ...|\n",
      "|623|[390, 1643, 173, ...|\n",
      "|737|[856, 60, 61, 151...|\n",
      "|897|[1368, 958, 320, ...|\n",
      "|858|[1154, 1129, 853,...|\n",
      "| 31|[1203, 1245, 889,...|\n",
      "|516|[745, 694, 1512, ...|\n",
      "|580|[1368, 958, 1589,...|\n",
      "|251|[1203, 1449, 253,...|\n",
      "|451|[1368, 1019, 958,...|\n",
      "| 85|[1643, 1449, 511,...|\n",
      "|137|[1368, 1643, 958,...|\n",
      "|808|[1512, 867, 1367,...|\n",
      "+---+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recs = model.recommendForAllUsers(10)\n",
    "recs_topk = recs.withColumn(\"id\", recs[userCol].cast(\"string\")) \\\n",
    "    .select(\"id\", \"recommendations.\" + itemCol)\n",
    "recs_topk.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save data to CosmosDB\n",
    "(recs_topk.coalesce(1)\n",
    " .write\n",
    " .format(\"com.microsoft.azure.cosmosdb.spark\")\n",
    " .mode('overwrite')\n",
    " .options(**dbsecrets)\n",
    " .save())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.2 Configure Azure Machine Learning\n",
    "\n",
    "Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** should be created. In the script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "score_sparkml = \"\"\"\n",
    "import json\n",
    "import pydocumentdb.document_client as document_client\n",
    "\n",
    "def init(local=False):\n",
    "    global client, collection\n",
    "    try:\n",
    "        client = document_client.DocumentClient('{endpoint}', dict(masterKey='{key}'))\n",
    "        collection = client.ReadCollection(collection_link='dbs/{database}/colls/{collection}')\n",
    "    except Exception as e:\n",
    "        collection = e\n",
    "\n",
    "def run(input_json):\n",
    "    try:\n",
    "        # Query them in SQL\n",
    "        id = str(json.loads(json.loads(input_json)[0])['id'])\n",
    "        query = dict(query='SELECT * FROM c WHERE c.id = \"' + id +'\"')\n",
    "        options = dict(partitionKey=str(id))\n",
    "        document_link = 'dbs/{database}/colls/{collection}/docs/' + id\n",
    "        result = client.ReadDocument(document_link, options);  \n",
    "    except Exception as e:\n",
    "        result = str(e)\n",
    "    return json.dumps(str(result))\n",
    "\"\"\".format(key=dbsecrets['Masterkey'], \n",
    "           endpoint=dbsecrets['Endpoint'], \n",
    "           database=dbsecrets['Database'], \n",
    "           collection=dbsecrets['Collection'])\n",
    "\n",
    "# test validity of python string\n",
    "exec(score_sparkml)\n",
    "\n",
    "with open(\"score_sparkml.py\", \"w\") as file:\n",
    "    file.write(score_sparkml)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Register your model:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Registering model mvl-als-reco.mml\n",
      "mvl-als-reco.mml AML trained model 1\n"
     ]
    }
   ],
   "source": [
    "mymodel = Model.register(\n",
    "    model_path=model_name,  # this points to a local file\n",
    "    model_name=model_name,  # this is the name the model is registered as\n",
    "    description=\"AML trained model\",\n",
    "    workspace=ws\n",
    ")\n",
    "\n",
    "print(mymodel.name, mymodel.description, mymodel.version)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.3 Deploy the model as a Service on AKS"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.3.1 Create an Environment for your model:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "env = Environment(name='sparkmlenv')\n",
    "\n",
    "# Specify a public image from microsoft/mmlspark as base image\n",
    "env.docker.base_image=\"microsoft/mmlspark:0.15\"\n",
    "\n",
    "pip = [\n",
    "    'azureml-defaults', \n",
    "    'numpy==1.14.2', \n",
    "    'scikit-learn==0.19.1', \n",
    "    'pandas', \n",
    "    'pydocumentdb'\n",
    "]\n",
    "\n",
    "# Add dependencies needed for inferencing\n",
    "env.python.conda_dependencies = CondaDependencies.create(pip_packages=pip)\n",
    "env.inferencing_stack_version = \"latest\"\n",
    "\n",
    "# Add spark packages\n",
    "env.spark.precache_packages = True\n",
    "env.spark.repositories = [\"https://mmlspark.azureedge.net/maven\"]\n",
    "env.spark.packages= [\n",
    "    SparkPackage(\"com.microsoft.ml.spark\", \"mmlspark_2.11\", \"0.15\"),\n",
    "    SparkPackage(\"com.microsoft.azure\", artifact=\"azure-storage\", version=\"2.0.0\"),\n",
    "    SparkPackage(group=\"org.apache.hadoop\", artifact=\"hadoop-azure\", version=\"2.7.0\")\n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.3.2 Create an AKS Cluster to run your container\n",
    "This may take 20 to 30 minutes depending on the cluster size."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Creating.......................................................................................................\n",
      "SucceededProvisioning operation finished, operation \"Succeeded\"\n",
      "Succeeded\n"
     ]
    }
   ],
   "source": [
    "# Verify that cluster does not exist already\n",
    "try:\n",
    "    aks_target = ComputeTarget(workspace=ws, name=aks_name)\n",
    "    print(\"Found existing cluster, use it.\")\n",
    "except ComputeTargetException:\n",
    "    # Create the cluster using the default configuration (can also provide parameters to customize)\n",
    "    prov_config = AksCompute.provisioning_configuration()\n",
    "    aks_target = ComputeTarget.create(\n",
    "        workspace=ws, \n",
    "        name=aks_name, \n",
    "        provisioning_configuration=prov_config\n",
    "    )\n",
    "    aks_target.wait_for_completion(show_output = True)\n",
    "    print(aks_target.provisioning_state)\n",
    "    # To check any error logs, print(aks_target.provisioning_errors)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.3.3 Deploy the container image to AKS:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running....................................................................................................................\n",
      "SucceededAKS service creation operation finished, operation \"Succeeded\"\n"
     ]
    }
   ],
   "source": [
    "# Create an Inferencing Configuration with your environment and scoring script\n",
    "inference_config = InferenceConfig(\n",
    "    environment=env,\n",
    "    entry_script=\"score_sparkml.py\"\n",
    ")\n",
    "\n",
    "# Set the web service configuration (using default here with app insights)\n",
    "aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)\n",
    "\n",
    "# Webservice creation using single command\n",
    "try:\n",
    "    aks_service = Model.deploy(\n",
    "        workspace=ws,\n",
    "        models=[mymodel],\n",
    "        name=service_name,\n",
    "        inference_config=inference_config,\n",
    "        deployment_config=aks_config,\n",
    "        deployment_target=aks_target\n",
    "    )\n",
    "    aks_service.wait_for_deployment(show_output=True)\n",
    "except WebserviceException:\n",
    "    # Retrieve existing service.\n",
    "    aks_service = Webservice(ws, name=service_name)\n",
    "    print(\"Retrieved existing service\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.4 Call the AKS model service\n",
    "After the deployment, the service can be called with a user ID – the service will then look up the top 10 recommendations for that user in Cosmos DB and send back the results.\n",
    "The following script demonstrates how to call the recommendation service API and view the result for the given user ID:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\n",
      "    \"MovieId\": [\n",
      "        320,\n",
      "        1589,\n",
      "        262,\n",
      "        1344,\n",
      "        958,\n",
      "        889,\n",
      "        1368,\n",
      "        645,\n",
      "        919,\n",
      "        1137\n",
      "    ],\n",
      "    \"id\": \"496\",\n",
      "    \"_rid\": \"34hEAIe9pterAQAAAAAACA==\",\n",
      "    \"_self\": \"dbs/34hEAA==/colls/34hEAIe9ptc=/docs/34hEAIe9pterAQAAAAAACA==/\",\n",
      "    \"_etag\": \"6d006b74-0000-0100-0000-5f25f0550000\",\n",
      "    \"_attachments\": \"attachments/\",\n",
      "    \"_ts\": 1596321877\n",
      "}\n",
      "Full run took 0.05 seconds\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "\n",
    "scoring_url = aks_service.scoring_uri\n",
    "service_key = aks_service.get_keys()[0]\n",
    "\n",
    "input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\"}\"]'.encode()\n",
    "\n",
    "req = urllib.request.Request(scoring_url, data=input_data)\n",
    "req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
    "req.add_header(\"Content-Type\",\"application/json\")\n",
    "\n",
    "with Timer() as t: \n",
    "    with urllib.request.urlopen(req) as result:\n",
    "        res = result.read()\n",
    "        resj = json.loads(\n",
    "            # Cleanup to parse into a json object\n",
    "            res.decode(\"utf-8\")\n",
    "            .replace(\"\\\\\", \"\")\n",
    "            .replace('\"', \"\")\n",
    "            .replace(\"'\", '\"')\n",
    "        )\n",
    "        print(json.dumps(resj, indent=4))\n",
    "    \n",
    "print(\"Full run took %.2f seconds\" % t.interval)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Appendix - Realtime scoring with AzureML\n",
    "\n",
    "In the previous cells, we utilized Cosmos DB to cache the recommendation results for realtime serving. Alternatively, we can generate recommendation results on demand by using the model we deployed. Following scripts load the registered model and use it for recommendation:\n",
    "\n",
    "* *score_sparkml.py*\n",
    "    ```\n",
    "    import json\n",
    "    import os\n",
    "    from pyspark.ml.recommendation import ALSModel\n",
    "\n",
    "    # Note, set `model_name`, `userCol`, and `itemCol` defined earlier.\n",
    "    model_name = \"mvl-als-reco.mml\"\n",
    "    userCol = \"UserId\"\n",
    "    itemCol = \"MovieId\"\n",
    "\n",
    "    def init(local=False):\n",
    "        global model\n",
    "\n",
    "        # Load ALS model.\n",
    "        model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), model_name)\n",
    "        model = ALSModel.load(model_path)\n",
    "\n",
    "    def run(input_json):\n",
    "        js = json.loads(json.loads(input_json)[0])\n",
    "        id = str(js['id'])\n",
    "        k = js.get('k', 10)\n",
    "\n",
    "        # Use the model to get recommendation.\n",
    "        recs = model.recommendForAllUsers(k)\n",
    "        recs_topk = recs.withColumn('id', recs[userCol].cast(\"string\")).select(\n",
    "            'id', \"recommendations.\" + itemCol\n",
    "        )\n",
    "        result = recs_topk[recs_topk.id==id].collect()[0].asDict()\n",
    "\n",
    "        return json.dumps(str(result))\n",
    "    ```\n",
    "\n",
    "* Call the AKS model service\n",
    "    ```\n",
    "    # Get a recommendation of 10 movies\n",
    "    input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\",\\\\\"k\\\\\":10}\"]'.encode()\n",
    "\n",
    "    req = urllib.request.Request(scoring_url, data=input_data)\n",
    "    req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
    "    req.add_header(\"Content-Type\",\"application/json\")\n",
    "    \n",
    "    ...\n",
    "    ```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python (reco_pyspark)",
   "language": "python",
   "name": "reco_pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.0"
  },
  "name": "ALS_Movie_Example",
  "notebookId": 3793436040750096,
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "metadata": {
     "collapsed": false
    },
    "source": []
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
